CREATE DATABASE if not exists tsensor;
create user if not exists 'sensor'@'localhost' identified by 'Whillmqq';
create user if not exists 'sensordataread'@'localhost' identified by 'Whill';

create table if not exists tsensor.datain (timestamp bigint unsigned not null comment 'Zeitstempel der Sensordaten',
	topic varchar(256) comment 'topic, unter den dieses Datum verwaltet wird',
	value bigint not null comment 'Sensordaten als Integer');
create table if not exists tsensor.data_storage (timestamp bigint unsigned not null comment 'Zeitstempel der Sensordaten als epoch',
	topicid int unsigned not null comment 'numerische ID des Topics aus der Topic-Tabelle',
	value bigint not null comment 'Sensordaten als Integer', index (timestamp,topicid));
create table if not exists tsensor.data_retent as select * from tsensor.data_storage;
create table if not exists tsensor.last_data (topicid int unsigned not null comment 'numerische ID des Topics aus der Topic-Tabelle',
	value bigint not null comment 'Sensordaten als Integer', index (topicid));
create table if not exists tsensor.cfg_topic (feld varchar(256) comment 'Bezeichnung des topic-Anteils',
	pos tinyint not null comment 'Position innerhalb des Topics',
	minlength tinyint comment 'Mindestanzahl an elementen, die das Topic haben muss, damit die Regel greift');
create table if not exists tsensor.topic_list (topicid int unsigned not null primary key comment 'numerische ID des topics',
	topic varchar(256) not null unique comment 'Topic');
create table if not exists tsensor.topic_def (topicid int unsigned not null comment 'Interne ID eines topics',
	feld varchar(256) comment 'Bezeichnung des Datums',
	inhalt varchar(256) comment 'Inhalt des Datums',index (topicid,feld));
create or replace view tsensor.data_view as select utd.*,tl.topic,tdq.inhalt as quantity,tdd.inhalt as device,tdi.inhalt as internal_id from ((select ds.timestamp/1000 as time,ds.topicid,ds.value from tsensor.data_storage ds) union (select ds.timestamp/1000 as time,ds.topicid,ds.value from tsensor.data_retent ds)) utd
	join tsensor.topic_list tl on utd.topicid=tl.topicid
	left join tsensor.topic_def tdq on tdq.topicid=utd.topicid and tdq.feld='quantity'
	left join tsensor.topic_def tdd on tdd.topicid=utd.topicid and tdq.feld='device'
	left join tsensor.topic_def tdi on tdi.topicid=utd.topicid and tdq.feld='internal_id';

grant select,insert,update on tsensor.data_storage to 'sensor'@'localhost' with grant option;
grant insert,select,update on tsensor.last_data to 'sensor'@'localhost' with grant option;
grant insert,select on tsensor.topic_list to 'sensor'@'localhost' with grant option;
grant insert,select on tsensor.topic_def to 'sensor'@'localhost' with grant option;
grant execute on tsensor.* to 'sensor'@'localhost' with grant option;
grant select on tsensor.data_storage to 'sensordataread'@'localhost' with grant option;
grant select on tsensor.data_view to 'sensordataread'@'localhost' with grant option;

delimiter //
create or replace procedure tsensor.retent_data()
begin
  declare tid bigint;
  declare btopicdone int;
  declare topic_curs cursor for select distinct topic_list.topicid from topic_list;
  declare continue handler for not found set btopicdone = 1;
  open topic_curs;
  repeat
    fetch topic_curs into tid;
    select count(*) into @actcount from data_storage where topicid=tid;
    if (@actcount > 2) then
      select min(timestamp) into @mts from (select timestamp from data_storage where topicid=tid order by timestamp desc limit 2) td ;
      insert into data_retent select * from data_storage where topicid=tid and timestamp < @mts;
      delete from data_storage where topicid=tid and timestamp < @mts;
      commit;
    end if;
  until btopicdone end repeat;
  close topic_curs;
end 
//
delimiter ;

DELIMITER //
CREATE OR REPLACE PROCEDURE tsensor.insert_topic(vtopic varchar(1024),newid int unsigned)
begin
 declare tpos tinyint;
 declare feldname varchar(1024);
 DECLARE bcfgDone INT;
 DECLARE cfg_curs CURSOR FOR SELECT cfg_topic.feld,cfg_topic.pos FROM cfg_topic where cfg_topic.minlength is null or cfg_topic.minlength <= (select (length(vtopic) - length(replace(vtopic,'/','')) +1));
 DECLARE CONTINUE HANDLER FOR NOT FOUND SET bcfgDone = 1;
 OPEN cfg_curs;
 REPEAT
  fetch cfg_curs into feldname,tpos;
  if (tpos < 0) then
   select substring_index(substring_index(vtopic,'/',tpos),'/',1) into @feldinhalt;
  else
   select substring_index(substring_index(vtopic,'/',tpos),'/',-1) into @feldinhalt;
  end if;
  if (select count(*) from topic_def where topic_def.topicid = newid and topic_def.feld=feldname) = 0 then
   insert into topic_def (topicid,feld,inhalt) values (newid,feldname,@feldinhalt);
   commit;
  end if;
 UNTIL bcfgDone END REPEAT;
 close cfg_curs;
end
//
DELIMITER ;

DELIMITER //
CREATE OR REPLACE PROCEDURE tsensor.insert_data(IN intopic varchar(256),IN invalue bigint,IN tstamp bigint unsigned)
BEGIN
  select crc32(intopic) into @crctopic;
  if (@crctopic is null) then
    signal sqlstate '45000' set mysql_errno=32100,message_text='Missing topic';
  end if;
  if (select invalue is null) then
    signal sqlstate '45000' set mysql_errno=32100,message_text='Missing value';
  end if;
  if (select tstamp is null) then
    signal sqlstate '45000' set mysql_errno=32100,message_text='Missing timestamp';
  end if;
    if (@crctopic in (select topicid from topic_list))=0 then
      # new topic means no value existent
      call insert_topic(intopic,@crctopic);
      insert into topic_list (topicid,topic) values (@crctopic,intopic);
      insert into data_storage (timestamp,topicid,value) values (tstamp,@crctopic,invalue);
      insert into last_data (topicid,value) values (@crctopic,invalue);
    else
      # topic exists and values should be available
      if (invalue not in (select value from last_data where topicid=@crctopic)) then
	insert into data_storage (timestamp,topicid,value) values (tstamp,@crctopic,invalue);
        update last_data set value=invalue where topicid=@crctopic;
      else
	if (select count(distinct td.value) from (select data_storage.value from data_storage where topicid=@crctopic and (data_storage.timestamp > (unix_timestamp(now())-300)*1000) order by timestamp desc limit 2) td) = 1 then
          update data_storage set timestamp=tstamp where topicid=@crctopic and timestamp=(select max(timestamp) from data_storage where topicid=@crctopic);
        else
          insert into data_storage (timestamp,topicid,value) values (tstamp,@crctopic,invalue);
        end if; # check if second last match
      end if; # compare with last value
    end if; # topicid is null
    commit;
END
//
DELIMITER ;

grant execute on tsensor.* to 'sensor'@'localhost' with grant option;
set global event_scheduler=ON;

create event  if not exists tsensor.retent_old_data on schedule every 1 hour do call tsensor.retent_data();

insert into tsensor.cfg_topic (feld,pos,minlength) values ('device',2,NULL);
insert into tsensor.cfg_topic (feld,pos,minlength) values ('quantity',-1,NULL);
insert into tsensor.cfg_topic (feld,pos,minlength) values ('place',3,NULL);
insert into tsensor.cfg_topic (feld,pos,minlength) values ('internal_id',4,5);
commit;
